跳到主要内容

Zookeeper 客户端 Curator

参考资料 官方文档 What is Curator? 参考资料 Zookeeper客户端Curator使用详解

Curator 是什么

Curator 是 zookeeper 分布式协调服务的 java 客户端库,它包装了一系列操作 zk 的高级 API 和实用库,使得操作 zk 变得更加容易和可靠。例如使用原生 zk 的 API 实现分布式锁的话,代码量多,复杂,使用 Curator 后就相对简单的多,很多底层的 api 都直接封装好了,开箱即用,学习成本低。

配置环境

参考资料 Maven / Artifacts

Curator包含了几个包(curator-client 内置了 zookeeper,无需再额外引入了):

curator-framework:对 zookeeper 的底层 api 的一些封装 curator-client:提供一些客户端的操作,例如重试策略等 curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.1.0</version>
</dependency>

创建会话

静态工程方法创建客户端

// 重连机制
// new RetryOneTime(3000):每三秒重连一次,只重连一次
// new RetryNTimes(3, 3000):每每三秒重连一次,共重连3次
// new RetryUntilElapsed(10000, 3000):每三秒重连一次,10秒后停止重连
// new ExponentialBackoffRetry(1000, 3):重连3次,每次重连的间隔会越来越长
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

String connectionInfo = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

CuratorFramework client = CuratorFrameworkFactory.newClient(
connectionInfo,
5000,
3000,
retryPolicy);

newClient 静态工厂方法包含四个主要参数:

参数名说明
connectionString服务器列表,格式 host1:port1,host2:port2,...
retryPolicy重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs连接创建超时时间,单位毫秒,默认60000ms

使用 Fluent 风格的创建会话

核心参数变为流式设置,一个列子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

创建包含隔离命名空间的会话

为了实现不同的 Zookeeper 业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个 Zookeeper 的根路径

例如(下面的例子)当客户端指定了独立命名空间为 /base,那么该客户端对 Zookeeper 上的数据节点的操作都是基于该目录进行的。通过设置 Chroot 可以将客户端应用与 Zookeeper 服务端的一课子树相对应,在多个应用共用一个 Zookeeper 集群的场景下,这对于实现不同应用之间的相互隔离十分有意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();

启动客户端

当创建会话成功,得到client的实例然后可以直接调用其 start( ) 方法:

client.start();

数据节点操作

创建数据节点

Zookeeper 的节点创建模式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带序列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时并且带序列号

创建一个节点,初始内容为空

client.create().forPath("path");

注意:如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空

创建一个节点,附带初始化内容

client.create().forPath("path","init".getBytes());

创建一个节点,指定创建模式(临时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创建一个节点,指定创建模式(临时节点),附带初始化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点

client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());

这个 creatingParentContainersIfNeeded() 接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出 NoNodeException,使用 creatingParentContainersIfNeeded() 之后 Curator 能够自动递归创建所有所需的父节点。

删除数据节点

删除一个节点

client.delete().forPath("path");

注意,此方法只能删除叶子节点,否则会抛出异常。

删除一个节点,并且递归删除其所有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删除一个节点,强制指定版本进行删除

client.delete().withVersion(10086).forPath("path");

删除一个节点,强制保证删除

client.delete().guaranteed().forPath("path");

guaranteed() 接口是一个保障措施,只要客户端会话有效,那么 Curator 会在后台持续进行删除操作,直到删除节点成功。

注意:上面的多个流式接口是可以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

读取数据节点数据

读取一个节点的数据内容

client.getData().forPath("path");

注意,此方法返的返回值是 byte[ ];

读取一个节点的数据内容,同时获取到该节点的 stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

更新数据节点数据

更新一个节点的数据内容

client.setData().forPath("path","data".getBytes());

注意:该接口会返回一个 Stat 实例

更新一个节点的数据内容,强制指定版本进行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

检查节点是否存在

client.checkExists().forPath("path");

注意:该方法返回一个 Stat 实例,用于检查 ZNode 是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用 forPath( ) 指定要操作的 ZNode

获取某个节点的所有子节点路径

client.getChildren().forPath("path");

注意:该方法的返回值为 List<String>,获得 ZNode 的子节点 Path 列表。

可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用 forPath() 指定要操作的父 ZNode

事务

CuratorFramework 的实例包含 inTransaction( ) 接口方法,调用此方法开启一个 ZooKeeper 事务。 可以复合 create, setData, check, and/or delete 等操作然后调用 commit() 作为一个原子操作提交。一个例子如下:

client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();

异步接口

上面提到的创建、删除、更新、读取等方法都是同步的,Curator 提供异步接口,引入了 BackgroundCallback 接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback 接口中一个重要的回调值为 CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。

CuratorEventType

事件类型对应CuratorFramework实例的方法
CREATE#create()
DELETE#delete()
EXISTS#checkExists()
GET_DATA#getData()
SET_DATA#setData()
CHILDREN#getChildren()
SYNC#sync(String,Object)
GET_ACL#getACL()
SET_ACL#setACL()
WATCHED#Watcher(Watcher)
CLOSING#close()

响应码(#getResultCode())

响应码意义
0OK,即调用成功
-4ConnectionLoss,即客户端与服务端断开连接
-110NodeExists,即节点已经存在
-112SessionExpired,即会话过期

一个异步创建节点的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> {
System.out.println(String.format(
"eventType:%s,resultCode:%s",
curatorEvent.getType(),
curatorEvent.getResultCode()));
}, executor)
.forPath("path");

注意:如果 #inBackground() 方法不指定 executor,那么会默认使用 Curator 的 EventThread 去进行异步处理。

缓存(事件监听机制)

Zookeeper 原生支持通过注册 Watcher 来进行事件监听,但是开发者需要反复注册(Watcher 只能单次注册单次使用)。

Cache 是 Curator 中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。

Curator 提供了两种 Watcher(Cache) 来监听结点的变化。

1、NodeCache:只是监听某一个特定的节点,监听节点的新增和修改。

2、PathChildrenCache:监控一个 ZNode 的子节点. 当一个子节点增加、更新、删除时,Path Cache 会改变它的状态,会包含最新的子节点,子节点的数据和状态。

监视某个节点的数据变化

@Test
public void watcher1() throws Exception {
// 参数1:连接对象
// 参数2:监视的节点路径
final NodeCache nodeCache = new NodeCache(client, "/watcher1");
// 启动解释器对象
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
// 节点变化时回调的方法
@Override
public void nodeChanged() throws Exception {
System.out.println(nodeCache.getCurrentData().getPath());
System.out.println(new String(nodeCache.getCurrentData().getData()));
System.out.println(nodeCache.getCurrentData().getStat());
}
});
TimeUnit.SECONDS.sleep(30);
// 关闭监视器对象
nodeCache.close();
}

监视某子节点的数据变化

@Test
public void watcher2() throws Exception {
// 参数1:连接对象
// 参数2:监视的节点路径
// 参数3:事件中是否可以获取节点的数据
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/watcher1", true);
// 启动解释器对象
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
// 当子节点方法变化时回调的方法
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
// 节点的事件类型
System.out.println(pathChildrenCacheEvent.getType());
// 节点的路径
System.out.println(pathChildrenCacheEvent.getData().getPath());
// 节点的数据
System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
}
});
TimeUnit.SECONDS.sleep(30);
// 关闭监视器对象
pathChildrenCache.close();
}

分布式锁

排他锁

@Test
public void lock1() throws Exception {
// 参数1:连接对象
// 参数2:节点路径
InterProcessLock interProcessLock = new InterProcessMutex(client, "/lock1");
System.out.println("等待获取锁对象");

// 获取锁
interProcessLock.acquire();
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}

System.out.println("等待释放锁");
// 释放锁
interProcessLock.release();
}

读写锁

读锁

@Test
public void lock2() throws Exception {
// 读写锁
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/lock1");
// 获取读锁对象
InterProcessMutex interProcessMutex = readWriteLock.readLock();
System.out.println("等待获取锁对象");
// 获取锁
interProcessMutex.acquire();
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
System.out.println("等待释放锁");
// 释放锁
interProcessMutex.release();
}

写锁

@Test
public void lock3() throws Exception {
// 读写锁
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/lock1");
// 获取读锁对象
InterProcessMutex interProcessMutex = readWriteLock.writeLock();
System.out.println("等待获取锁对象");
// 获取锁
interProcessMutex.acquire();
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
System.out.println("等待释放锁");
// 释放锁
interProcessMutex.release();
}